package com.stars.easyms.schedule.core;

import com.stars.easyms.schedule.bean.DbScheduleServer;
import com.stars.easyms.schedule.enums.YesOrNo;
import com.stars.easyms.schedule.enums.Switch;
import com.stars.easyms.schedule.service.DistributedScheduleService;

import java.util.HashMap;
import java.util.Map;

/**
 * 分布式调度框架的拉取服务器配置线程：
 *      使用数据库作为心跳服务器的优势是不受其他独立中间件的影响，弊端就是没有推送且实时性不能保证
 *      该类处理拉取服务器配置的工作，对于该分布式调度框架而言使用数据库作为心跳服务器不是最优方法但是是最稳定方法
 *      后续会新增对其他插件的支持并为可选式
 *
 * @author guoguifang
 */
class DistributedSchedulePuller extends AbstractDistributedScheduleThread {

    @Override
    public void execute() {

        // 拉取当前服务器的最新配置信息，如果有与当前配置不同的地方部署最新的配置信息
        pullAndDeployServerConfig();

        // 将所有心跳停止超过30秒的服务器置为不存活状态
        if (updateDeadSeverStatus() > 0) {
            // 如果有死亡状态的服务器则把死亡状态的服务器执行的子任务置为'阻塞待处理'状态
            if (DistributedScheduler.getSingleInstance().maintainOutOfWorkSubTask(false) > 0) {
                DistributedScheduleCatcher.getSingleInstance().signalIdle();
            }
        }

        // 拉取间隔
        awaitIdleMillis(3000);
    }

    /**
     * 拉取当前服务器的最新配置信息，如果有与当前配置不同的地方部署最新的配置信息
     */
    private void pullAndDeployServerConfig() {
        // 当前服务器当前配置信息
        DbScheduleServer currentServerConfig = getDistributedScheduleServerConfig();

        // 拉取当前服务器的最新配置信息
        DbScheduleServer latestServerConfig = DistributedScheduleService.getInstance().getDbScheduleServerConfig(currentServerConfig);
        if (latestServerConfig == null) {
            DistributedScheduleService.getInstance().insertDbScheduleServer(currentServerConfig);
            return;
        }

        // 判断服务器标识是否改变
        if (!currentServerConfig.getServerId().equals(latestServerConfig.getServerId())) {
            currentServerConfig.setServerId(latestServerConfig.getServerId());
        }

        // 判断核心线程池大小、最大线程池大小是否修改
        int updatePoolSizeLevel = 0;
        if (!currentServerConfig.getCorePoolSize().equals(latestServerConfig.getCorePoolSize())) {
            updatePoolSizeLevel++;
        }
        if (!currentServerConfig.getMaxPoolSize().equals(latestServerConfig.getMaxPoolSize())) {
            updatePoolSizeLevel += 2;
        }
        if (updatePoolSizeLevel == 1) {
            DistributedScheduleManager.getSingleInstance().setCorePoolSize(latestServerConfig.getCorePoolSize());
        } else if (updatePoolSizeLevel == 2) {
            DistributedScheduleManager.getSingleInstance().setMaxPoolSize(latestServerConfig.getMaxPoolSize());
        } else if (updatePoolSizeLevel == 3) {
            DistributedScheduleManager.getSingleInstance().setPoolSize(latestServerConfig.getCorePoolSize(), latestServerConfig.getMaxPoolSize());
        }

        // 判断空闲线程存活时间是否修改
        if (!currentServerConfig.getKeepAliveTimeMilli().equals(latestServerConfig.getKeepAliveTimeMilli())) {
            DistributedScheduleManager.getSingleInstance().setKeepAliveTime(latestServerConfig.getKeepAliveTimeMilli());
        }

        // 判断调度器开关是否改变
        Switch currentScheduleSwitch = Switch.forCode(currentServerConfig.getScheduleSwitch());
        Switch latestScheduleSwitch = Switch.forCode(latestServerConfig.getScheduleSwitch());
        if (latestScheduleSwitch != null && currentScheduleSwitch != latestScheduleSwitch) {
            currentServerConfig.setScheduleSwitch(latestScheduleSwitch.getCode());
            if (latestScheduleSwitch == Switch.OPEN) {
                DistributedScheduleManager.getSingleInstance().startSchedule();
            } else {
                DistributedScheduleManager.getSingleInstance().stopSchedule();
            }
        }

        // 如果需要唤醒调度器则立即唤醒调度器并重置
        if (YesOrNo.forCode(latestServerConfig.getWakeup()) == YesOrNo.YES) {
            DistributedScheduleManager.getSingleInstance().setDbScheduleConfig(DistributedScheduleService.getInstance().getDbScheduleConfig());
            DistributedScheduler.getSingleInstance().needInitMismatchTask();
            latestServerConfig.setWakeup(YesOrNo.NO.getCode());
            DistributedScheduleService.getInstance().updateDbScheduleServer(latestServerConfig);
        }
    }

    private Map<String, Object> updateDeadSeverStatusMap;

    /**
     * 更新已死亡服务器状态
     * @return 本地修改成功数量
     */
    private int updateDeadSeverStatus() {
        if (updateDeadSeverStatusMap == null) {
            updateDeadSeverStatusMap = new HashMap<>(8);
            updateDeadSeverStatusMap.put("deathInterval", 30);
            updateDeadSeverStatusMap.put("alive_no", YesOrNo.NO.getCode());
            updateDeadSeverStatusMap.put("alive_yes", YesOrNo.YES.getCode());
        }
        return DistributedScheduleService.getInstance().updateDeadSeverStatus(updateDeadSeverStatusMap);
    }

    private static DistributedSchedulePuller singleInstance;

    static DistributedSchedulePuller getSingleInstance() {
        if (singleInstance == null) {
            synchronized (DistributedSchedulePuller.class) {
                if (singleInstance == null) {
                    singleInstance = new DistributedSchedulePuller();
                }
            }
        }
        return singleInstance;
    }

    private DistributedSchedulePuller(){
        super();
    }

}
